DiFlow

Toni Verbeiren

Step by step

Step 1 - Operate on a stream

// Step - 1
workflow step1 {
  Channel.from(1) \
    | map{ it + 1 } \
    | view{ it }
}

Step 2 - Operate on a stream in parallel

// Step - 2
workflow step2 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ it + 1 } \
    | view{ it }
}

Step 3 - Operate on a stream using a process

// Step - 3
process add {
  input:
    val(input)
  output:
    val(output)
  exec:
    output = input + 1
}
workflow step3 {
  Channel.from( [ 1, 2, 3 ] ) \
    | add \
    | view{ it }
}

Step 4 - How map is synchronous

// Step - 4
def waitAndReturn(it) { sleep(2000); return it }
workflow step4 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ (it == 2) ? waitAndReturn(it) : it } \
    | map{ it + 1 } \
    | view{ it }
}

Step 5 - Introduce an ID

// Step - 5
process addTuple {
  input:
    tuple val(id), val(input)
  output:
    tuple val("${id}"), val(output)
  exec:
    output = input + 1
}
workflow step5 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el ]} \
    | addTuple \
    | view{ it }
}

Step 6 - Add a process parameter

// Step - 6
process addTupleWithParameter {
  input:
    tuple val(id), val(input), val(term)
  output:
    tuple val("${id}"), val(output)
  exec:
    output = input + term
}
workflow step6 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, 10 ]} \
    | addTupleWithParameter \
    | view{ it }
}

Step 7 - Use a Map to store parameters

// Step - 7
process addTupleWithMap {
  input:
    tuple val(id), val(input), val(config)
  output:
    tuple val("${id}"), val(output)
  exec:
    output = (config.operator == "+")
                ? input + config.term
                : input - config.term
}
workflow step7 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el ->
      [
        el.toString(),
        el,
        [ "operator" : "-", "term" : 10 ]
      ] } \
    | addTupleWithMap \
    | view{ it }
}

Step 8 - Use a Map with a process-key

// Step - 8
process addTupleWithProcessHash {
  input:
    tuple val(id), val(input), val(config)
  output:
    tuple val("${id}"), val(output)
  exec:
    def thisConf = config.addTupleWithProcessHash
    output = (thisConf.operator == "+")
                ? input + thisConf.term
                : input - thisConf.term
}
workflow step8 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el ->
      [
        el.toString(),
        el,
        [ "addTupleWithProcessHash" :
          [
            "operator" : "-",
            "term" : 10
          ]
        ]
      ] } \
    | addTupleWithProcessHash \
    | view{ it }
}

Step 9 - Use a ConfigMap with a shell script

// Step - 9
process addTupleWithProcessHashScript {
  input:
    tuple val(id), val(input), val(config)
  output:
    tuple val("${id}"), stdout
  script:
    def thisConf = config.addTupleWithProcessHashScript
    def operator = thisConf.operator
    def term = thisConf.term
    """
    echo \$( expr $input $operator ${thisConf.term} )
    """
}
workflow step9 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el ->
      [
        el.toString(),
        el,
        [ "addTupleWithProcessHashScript" :
          [
            "operator" : "-",
            "term" : 10
          ]
        ]
      ] } \
    | addTupleWithProcessHashScript \
    | view{ it }
}

Step 10 - Running a pipeline

// Step - 10
process process_step10a {
  input:
    tuple val(id), val(input), val(term)
  output:
    tuple val("${id}"), val(output), val("${term}")
  exec:
    output = input.toInteger() + term.toInteger()
}
process process_step10b {
  input:
    tuple val(id), val(input), val(term)
  output:
    tuple val("${id}"), val(output), val("${term}")
  exec:
    output = input.toInteger() - term.toInteger()
}
workflow step10 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, 10 ] } \
    | process_step10a \
    | process_step10b \
    | view{ it }
}

// Step - 10a
workflow step10a {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, 10 ] } \
    | process_step10a \
    | map{ id, value, term -> [ id, value, 5 ] } \
    | map{ [ it[0], it[1], 5 ] } \
    | map{ x -> [ x[0], x[1], 5 ] } \
    | process_step10b \
    | view{ it }
}

```groovy
  ...
  | map{ x -> [ x[0], x[1], 5 ] } \
  ...
```groovy
  ...
  | map{ id, value, term -> [ id, value, 5 ] } \
  ...

Step 11 - A more generic process

// Step - 11
process process_step11 {
    input:
        tuple val(id), val(input), val(config)
    output:
        tuple val("${id}"), val(output), val("${config}")
    exec:
        if (config.operator == "+")
           output = input.toInteger() + config.term.toInteger()
        else
           output = input.toInteger() - config.term.toInteger()
}
workflow step11 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, [ : ] ] } \
    | process_step11 \
    | map{ id, value, config ->
      [
        id,
        value,
        [ "term" : 11, "operator" : "-" ]
      ] } \
    | process_step11 \
    | view{ [ it[0], it[1] ] }
}

// Step - 11a
include { process_step11 as process_step11a } \
  from './examples/modules/step11.nf'
include { process_step11 as process_step11b } \
  from './examples/modules/step11.nf'
workflow step11a {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, [ : ] ] } \
    | map{ id, value, config ->
      [
        id,
        value,
        [ "term" : 5, "operator" : "+" ]
      ] } \
    | process_step11a \
    | map{ id, value, config ->
      [
        id,
        value,
        [ "term" : 11, "operator" : "-" ]
      ] } \
    | process_step11b \
    | view{ [ it[0], it[1] ] }
}

Step 12 - Map/reduce in NextFlow

// Step - 12
process process_step12 {
  input:
    tuple val(id), val(input), val(term)
  output:
    tuple val("${id}"), val(output), val("${term}")
  exec:
    output = input.sum()
}
workflow step12 {
  Channel.from( [ 1, 2, 3 ] ) \
    | map{ el -> [ el.toString(), el, 10 ] } \
    | process_step10a \
    | toList \
    | map{
      [
        "sum",
        it.collect{ id, value, config -> value },
        [ : ]
      ] } \
    | process_step12 \
    | view{ [ it[0], it[1] ] }
}

Step 13 - Files as input/output

// Step - 13
process process_step13 {
  input:
    tuple val(id), file(input), val(config)
  output:
    tuple val("${id}"), file("output.txt"), val("${config}")
  script:
    """
    a=`cat $input`
    let result="\$a + ${config.term}"
    echo "\$result" > output.txt
    """
}
workflow step13 {
  Channel.fromPath( params.input ) \
    | map{ el ->
      [
        el.baseName.toString(),
        el,
        [ "operator" : "-", "term" : 10 ]
      ]} \
    | process_step13 \
    | view{ [ it[0], it[1] ] }
}

Step 14 - Publishing output

// Step - 13
process process_step13 {
  input:
    tuple val(id), file(input), val(config)
  output:
    tuple val("${id}"), file("output.txt"), val("${config}")
  script:
    """
    a=`cat $input`
    let result="\$a + ${config.term}"
    echo "\$result" > output.txt
    """
}
workflow step13 {
  Channel.fromPath( params.input ) \
    | map{ el ->
      [
        el.baseName.toString(),
        el,
        [ "operator" : "-", "term" : 10 ]
      ]} \
    | process_step13 \
    | view{ [ it[0], it[1] ] }
}

Step 15 - Make output files/paths unique

// Step - 15
process process_step15 {
    publishDir "output/${config.id}"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file("output.txt"), val("${config}")
    script:
        """
        a=`cat $input`
        let result="\$a + ${config.term}"
        echo "\$result" > output.txt
        """
}
workflow step15 {
    Channel.fromPath( params.input ) \
        | map{ el ->
            [
              el.baseName,
              el,
              [
                "id": el.baseName,
                "operator" : "-",
                "term" : 10
              ]
            ] } \
        | process_step15 \
        | view{ [ it[0], it[1] ] }
}

Step 16 - Where to put params?

Step 17 - Add the output file to params

// Step - 17
process process_step17 {
    publishDir "output"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file(params.output), val("${config}")
    script:
        """
        a=`cat $input`
        let result="\$a + ${config.term}"
        echo "\$result" > ${params.output}
        """
}
workflow step17 {
    Channel.fromPath( params.input ) \
        | map{ el ->
          [
            el.baseName.toString(),
            el,
            [
              "id": el.baseName,
              "operator" : "-",
              "term" : 10
            ]
          ] } \
        | process_step17 \
        | view{ [ it[0], it[1] ] }
}

Step 18 - Add the output filename to the triplet

// Step - 18
process process_step18 {
    publishDir "output"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file("${config.output}"), val("${config}")
    script:
        """
        a=`cat $input`
        let result="\$a + ${config.term}"
        echo "\$result" > ${config.output}
        """
}
workflow step18 {
    Channel.fromPath( params.input ) \
        | map{ el -> [
            el.baseName.toString(),
            el,
            [
                "output" : "output_from_${el.baseName}.txt",
                "id": el.baseName,
                "operator" : "-",
                "term" : 10
            ]
          ]} \
        | process_step18 \
        | view{ [ it[0], it[1] ] }
}

Step 19 - Use a closure

// Step - 19
def out_from_in = { it -> it.baseName + "-out.txt" }
process process_step19 {
    publishDir "output"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file("${out}"), val("${config}")
    script:
        out = out_from_in(input)
        """
        a=`cat $input`
        let result="\$a + ${config.term}"
        echo "\$result" > ${out}
        """
}
workflow step19 {
    Channel.fromPath( params.input ) \
        | map{ el -> [
            el.baseName.toString(),
            el,
            [
                "id": el.baseName,
                "operator" : "-",
                "term" : 10
            ]
          ]} \
        | process_step19 \
        | view{ [ it[0], it[1] ] }
}

Step 20 - The order of events in a stream

// Step - 20a
process process_step20 {
    input:
        tuple val(id), val(input), val(term)
    output:
        tuple val("${id}"), val(output), val("${term}")
    exec:
        output = input[0] / input[1]
}
workflow step20a {
    Channel.from( [ 1, 2 ] ) \
        | map{ el -> [ el.toString(), el, 10 ] } \
        | process_step10a \
        | toList \
        | map{ [
                  "sum",
                  it.collect{ id, value, config -> value },
                  [ : ]
               ] } \
        | process_step20 \
        | view{ [ it[0], it[1] ] }
}

// Step - 20b
workflow step20b {
    Channel.from( [ 1, 2 ] ) \
        | map{ el -> [ el.toString(), el, 10 ] } \
        | process_step10a \
        | toSortedList{ a,b -> a[0] <=> b[0] } \
        | map{ [ "sum", it.collect{ id, value, config -> value }, [ : ] ] } \
        | process_step20 \
        | view{ [ it[0], it[1] ] }
}

Step 21 - Is the triplet really necessary?

// Step - 21
process process_step21 {
    input:
        val(in1)
        val(in2)
    output:
        val(out)
    exec:
        out = in1 + in2
}
workflow step21 {
    ch1_ = Channel.from( [1, 2, 3, 4, 5 ] )
    ch2_ = Channel.from( ["a", "b", "c", "d" ] )
    process_step21(ch1_, ch2_) | toSortedList | view
}

// Step - 21a
workflow step21a {
    ch1_ = Channel.from( [1, 2, 3, 4, 5 ] ) | add
    ch2_ = Channel.from( ["a", "b", "c", "d" ] )
    process_step21(ch1_, ch2_) | toSortedList | view
}

Step 22 - Towards generic processes

// Step - 22
process process_step22 {
    publishDir "output"
    input:
        tuple val(id), file(input), val(config)
    output:
        tuple val("${id}"), file("${config.output}"), val("${config}")
    script:
        """
        ${config.cli}
        """
}
workflow step22 {
    Channel.fromPath( params.input ) \
        | map{ el -> [
            el.baseName.toString(),
            el,
            [
                "cli": "cat input.txt > output22.txt",
                "output": "output22.txt"
            ]
          ]} \
        | process_step22 \
        | view{ [ it[0], it[1] ] }
}
//- - -

Step 23 - More than one input

Step 24 - workflow instead of process

```groovy
process cellranger_process {
  ...
  container "${params.dockerPrefix}${container}"
  publishDir "${params.output}/processed_data/${id}/", mode: 'copy', overwrite: true
  input:
    tuple val(id), path(input), val(output), val(container), val(cli)
  output:
    tuple val("${id}"), path("${output}")
  script:
    """
    export PATH="${moduleDir}:\$PATH"
    $cli
    """
}
```groovy
workflow cellranger {
  take:
    id_input_params_
  main:
    ...
    result_ =  ...
  emit:
    result_
}

Step 25 - Custom scripts

Putting it all together

```groovy
nextflow.preview.dsl=2

import java.nio.file.Paths

include  plot_map       from  './target/nextflow/civ6_save_renderer/plot_map/main.nf'       params(params)
include  combine_plots  from  './target/nextflow/civ6_save_renderer/combine_plots/main.nf'  params(params)
include  convert_plot   from  './target/nextflow/civ6_save_renderer/convert_plot/main.nf'   params(params)
include  parse_header   from  './target/nextflow/civ6_save_renderer/parse_header/main.nf'   params(params)
include  parse_map      from  './target/nextflow/civ6_save_renderer/parse_map/main.nf'      params(params)
include  rename         from  './src/utils.nf'

workflow {

    if (params.debug == true)
        println(params)

    if (!params.containsKey("input") || params.input == "") {
        exit 1, "ERROR: Please provide a --input parameter pointing to .Civ6Save file(s)"
    }

    def input_ = Channel.fromPath(params.input)

    def listToTriplet = { it -> [ "all", it.collect{ a -> a[1] }, params ] }

    input_ \
        | map{ it -> [ it.baseName , it ] } \
        | map{ it -> [ it[0] , it[1], params ] } \
        | ( parse_header & parse_map ) \
        | join \
        | map{ id, parse_headerOut, params1, parse_mapOut, params2 ->
            [ id, [ "yaml" : parse_headerOut, "tsv": parse_mapOut ], params1 ] } \
        | plot_map \
        | convert_plot \
        | rename \
        | toSortedList{ a,b -> a[0] <=> b[0] }  \
        | map( listToTriplet ) \
        | combine_plots

}
// reveal.js plugins